// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/server/diagnostics_log.h"

#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <queue>
#include <string>
#include <utility>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <sparsehash/dense_hash_set>

#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/array_view.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/random.h"
#include "kudu/util/random_util.h"
#include "kudu/util/rolling_log.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"

using std::pair;
using std::priority_queue;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;

// GLog already implements symbolization. Just import their hidden symbol.
namespace google {
// Symbolizes a program counter.  On success, returns true and write the
// symbol name to "out".  The symbol name is demangled if possible
// (supports symbols generated by GCC 3.x or newer).  Otherwise,
// returns false.
bool Symbolize(void *pc, char *out, int out_size);
}

DEFINE_int32(diagnostics_log_stack_traces_interval_ms, 60000,
             "The interval at which the server will a snapshot of its thread stacks to the "
             "diagnostics log. In fact, the server will log at a random interval betweeen "
             "zero and twice the configured value to avoid biasing samples towards periodic "
             "processes which happen exactly on some particular schedule. If this is set to "
             "0, stack traces will be not be periodically logged, but will still be logged "
             "on events such as queue overflows. Setting this to a negative value disables "
             "stack trace logging entirely.");
TAG_FLAG(diagnostics_log_stack_traces_interval_ms, runtime);
TAG_FLAG(diagnostics_log_stack_traces_interval_ms, experimental);

namespace kudu {
namespace server {

// Track which symbols have been emitted to the log already.
class DiagnosticsLog::SymbolSet {
 public:
  SymbolSet() {
    set_.set_empty_key(nullptr);
  }

  // Return true if the addr was added, false if it already existed.
  bool Add(void* addr) {
    // We can't add nullptr since that's the 'empty' key. However this
    // also will never have a real symbol, so we'll just pretend it's already
    // present.
    return addr && InsertIfNotPresent(&set_, addr);
  }

  void ResetIfLogRolled(int roll_count) {
    if (roll_count_ != roll_count) {
      roll_count_ = roll_count;
      set_.clear();
    }
  }

 private:
  int roll_count_ = 0;
  google::dense_hash_set<void*> set_;
};

DiagnosticsLog::DiagnosticsLog(string log_dir,
                               string program_name,
                               MetricRegistry* metric_registry) :
    log_dir_(std::move(log_dir)),
    program_name_(std::move(program_name)),
    metric_registry_(metric_registry),
    wake_(&lock_),
    metrics_log_interval_(MonoDelta::FromSeconds(60)),
    symbols_(new SymbolSet()) {
}
DiagnosticsLog::~DiagnosticsLog() {
  Stop();
}

void DiagnosticsLog::SetMetricsLogInterval(MonoDelta interval) {
  MutexLock l(lock_);
  metrics_log_interval_ = interval;
}

void DiagnosticsLog::DumpStacksNow(std::string reason) {
  MutexLock l(lock_);
  dump_stacks_now_reason_ = std::move(reason);
  wake_.Signal();
}


Status DiagnosticsLog::Start() {
  unique_ptr<RollingLog> l(new RollingLog(Env::Default(), log_dir_, program_name_, "diagnostics"));
  RETURN_NOT_OK_PREPEND(l->Open(), "unable to open diagnostics log");
  log_ = std::move(l);
  Status s = Thread::Create("server", "diag-logger",
                            [this]() { this->RunThread(); }, &thread_);
  if (!s.ok()) {
    // Don't leave the log open if we failed to start our thread.
    log_.reset();
  }
  return s;
}

void DiagnosticsLog::Stop() {
  if (!thread_) return;

  {
    MutexLock l(lock_);
    stop_ = true;
    wake_.Signal();
  }
  thread_->Join();
  thread_.reset();
  stop_ = false;
  WARN_NOT_OK(log_->Close(), "Unable to close diagnostics log");
}

MonoTime DiagnosticsLog::ComputeNextWakeup(DiagnosticsLog::WakeupType type) const {
  switch (type) {
    case WakeupType::STACKS:
      if (FLAGS_diagnostics_log_stack_traces_interval_ms > 0) {
        // Instead of directly using the configured interval, we use a uniform random
        // interval whose mean is the configured value. This prevents biasing our stack
        // samples. For example, if there is some background process which happens once a
        // minute, and the user also configured the stacks to once a minute, an operator
        // might incorrectly surmise that the background task was _always_ running.
        // Randomizing the samples avoids such correlations.
        Random rng(GetRandomSeed32());
        int64_t ms = rng.Uniform(FLAGS_diagnostics_log_stack_traces_interval_ms * 2);
        return MonoTime::Now() + MonoDelta::FromMilliseconds(ms);
      } else {
        // Stack tracing is disabled. However we still wake up periodically because the
        // flag is runtime-modifiable, and we need to wake up to notice that it might have
        // changed.
        return MonoTime::Now() + MonoDelta::FromSeconds(5);
      }
      break;
    case WakeupType::METRICS:
      return MonoTime::Now() + metrics_log_interval_;
  }
  __builtin_unreachable();
}

void DiagnosticsLog::RunThread() {
  MutexLock l(lock_);

  // Set up a priority queue which tracks our future scheduled wake-ups.
  typedef pair<MonoTime, WakeupType> QueueElem;
  priority_queue<QueueElem, vector<QueueElem>, std::greater<QueueElem>> wakeups;
  wakeups.emplace(ComputeNextWakeup(WakeupType::METRICS), WakeupType::METRICS);
  wakeups.emplace(ComputeNextWakeup(WakeupType::STACKS), WakeupType::STACKS);

  while (!stop_) {
    MonoTime next_log = wakeups.top().first;
    wake_.WaitUntil(next_log);

    string reason;
    WakeupType what;
    if (dump_stacks_now_reason_) {
      what = WakeupType::STACKS;
      reason = std::move(*dump_stacks_now_reason_);
      dump_stacks_now_reason_ = boost::none;
    } else if (MonoTime::Now() >= next_log) {
      what = wakeups.top().second;
      reason = "periodic";
      wakeups.pop();
      wakeups.emplace(ComputeNextWakeup(what), what);
    } else {
      // Spurious wakeup, or a stop trigger.
      continue;
    }

    // Unlock the mutex while actually logging metrics or stacks since it's somewhat
    // slow and we don't want to block threads trying to signal us.
    l.Unlock();
    SCOPED_CLEANUP({ l.Lock(); });
    Status s;
    if (what == WakeupType::METRICS) {
      WARN_NOT_OK(LogMetrics(), "Unable to collect metrics to diagnostics log");
    } else if (what == WakeupType::STACKS && FLAGS_diagnostics_log_stack_traces_interval_ms >= 0) {
      WARN_NOT_OK(LogStacks(reason), "Unable to collect stacks to diagnostics log");
    }
  }
}

Status DiagnosticsLog::LogStacks(const string& reason) {
  StackTraceSnapshot snap;
  snap.set_capture_thread_names(false);
  RETURN_NOT_OK(snap.SnapshotAllStacks());

  std::ostringstream buf;
  MicrosecondsInt64 now = GetCurrentTimeMicros();

  // Because symbols are potentially long strings, and likely to be
  // very repetitive, we do a sort of dictionary encoding here. When
  // we roll a file, we clear our symbol table. Then, within that
  // file, the first time we see any address, we add it to the table
  // and make sure it is output in a 'symbols' record. Subsequent
  // repetitions of the same address do not need to re-output the
  // symbol.
  symbols_->ResetIfLogRolled(log_->roll_count());
  vector<std::pair<void*, string>> new_symbols;
  snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
      const StackTrace& stack = group[0].stack;
      for (int i = 0; i < stack.num_frames(); i++) {
        void* addr = stack.frame(i);
        if (symbols_->Add(addr)) {
          char buf[1024];
          // Subtract 1 from the address before symbolizing, because the
          // address on the stack is actually the return address of the function
          // call rather than the address of the call instruction itself.
          if (google::Symbolize(static_cast<char*>(addr) - 1, buf, sizeof(buf))) {
            new_symbols.emplace_back(addr, buf);
          }
          // If symbolization fails, don't bother adding it. Readers of the log
          // will just see that it's missing from the symbol map and should handle that
          // as an unknown symbol.
        }
      }
    });
  if (!new_symbols.empty()) {
    buf << "I" << FormatTimestampForLog(now)
        << " symbols " << now << " ";
    JsonWriter jw(&buf, JsonWriter::COMPACT);
    jw.StartObject();
    for (auto& p : new_symbols) {
      jw.String(StringPrintf("%p", p.first));
      jw.String(p.second);
    }
    jw.EndObject();
    buf << "\n";
  }

  buf << "I" << FormatTimestampForLog(now) << " stacks " << now << " ";
  JsonWriter jw(&buf, JsonWriter::COMPACT);
  jw.StartObject();
  jw.String("reason");
  jw.String(reason);
  jw.String("groups");
  jw.StartArray();
  snap.VisitGroups([&](ArrayView<StackTraceSnapshot::ThreadInfo> group) {
      jw.StartObject();

      jw.String("tids");
      jw.StartArray();
      for (auto& t : group) {
        // TODO(todd): should we also output the thread names, perhaps in
        // a sort of dictionary fashion? It's more instructive but in
        // practice the stack traces should usually indicate the work
        // that's being done, anyway, which is enough to tie back to the
        // thread. The TID is smaller and useful for correlating against
        // messages in the normal glog as well.
        jw.Int64(t.tid);
      }
      jw.EndArray();

      jw.String("stack");
      jw.StartArray();
      const StackTrace& stack = group[0].stack;
      for (int i = 0; i < stack.num_frames(); i++) {
        jw.String(StringPrintf("%p", stack.frame(i)));
      }
      jw.EndArray();
      jw.EndObject();
    });
  jw.EndArray(); // array of thread groups
  jw.EndObject(); // end of top-level object
  buf << "\n";

  RETURN_NOT_OK(log_->Append(buf.str()));

  return Status::OK();
}

Status DiagnosticsLog::LogMetrics() {
  MetricJsonOptions opts;
  opts.include_raw_histograms = true;

  opts.only_modified_in_or_after_epoch = metrics_epoch_;

  // We don't output any metrics which have never been incremented. Though
  // this seems redundant with the "only include changed metrics" above, it
  // also ensures that we don't dump a bunch of zero data on startup.
  opts.include_untouched_metrics = false;

  // Entity attributes aren't that useful in the context of this log. We can
  // always grab the entity attributes separately if necessary.
  opts.include_entity_attributes = false;

  std::ostringstream buf;
  MicrosecondsInt64 now = GetCurrentTimeMicros();
  buf << "I" << FormatTimestampForLog(now)
      << " metrics " << now << " ";

  // Collect the metrics JSON string.
  int64_t this_log_epoch = Metric::current_epoch();
  Metric::IncrementEpoch();
  JsonWriter writer(&buf, JsonWriter::COMPACT);
  RETURN_NOT_OK(metric_registry_->WriteAsJson(&writer, opts));
  buf << "\n";

  RETURN_NOT_OK(log_->Append(buf.str()));

  // Next time we fetch, only show those that changed after the epoch
  // we just logged.
  //
  // NOTE: we only bump this in the successful log case so that if we failed to
  // write above, we wouldn't skip any changes.
  metrics_epoch_ = this_log_epoch + 1;
  return Status::OK();
}


} // namespace server
} // namespace kudu
